/*

 * Licensed to the Apache Software Foundation (ASF) under one

 * or more contributor license agreements.  See the NOTICE file

 * distributed with this work for additional information

 * regarding copyright ownership.  The ASF licenses this file

 * to you under the Apache License, Version 2.0 (the

 * "License"); you may not use this file except in compliance

 * with the License.  You may obtain a copy of the License at

 *

 *     http://www.apache.org/licenses/LICENSE-2.0

 *

 * Unless required by applicable law or agreed to in writing, software

 * distributed under the License is distributed on an "AS IS" BASIS,

 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

 * See the License for the specific language governing permissions and

 * limitations under the License.

 */

package com.bff.gaia.unified.sdk.extensions.sql.example;



import com.bff.gaia.unified.sdk.Pipeline;

import com.bff.gaia.unified.sdk.extensions.sql.SqlTransform;

import com.bff.gaia.unified.sdk.extensions.sql.example.model.Customer;

import com.bff.gaia.unified.sdk.extensions.sql.example.model.Order;

import com.bff.gaia.unified.sdk.options.PipelineOptions;

import com.bff.gaia.unified.sdk.options.PipelineOptionsFactory;

import com.bff.gaia.unified.sdk.transforms.Create;

import com.bff.gaia.unified.sdk.transforms.MapElements;

import com.bff.gaia.unified.sdk.transforms.SimpleFunction;

import com.bff.gaia.unified.sdk.values.PCollection;

import com.bff.gaia.unified.sdk.values.PCollectionTuple;

import com.bff.gaia.unified.sdk.values.Row;

import com.bff.gaia.unified.sdk.values.TupleTag;



import javax.annotation.Nullable;



/**

 * This example uses Unified SQL DSL to query a data pipeline with Java objects in it.

 *

 * <p>Run the example from the Unified source root with

 *

 * <pre>

 *   ./gradlew :unified-sdks-java-extensions-sql:runPojoExample

 * </pre>

 *

 * <p>The above command executes the example locally using direct runner. Running the pipeline in

 * other runners require additional setup and are out of scope of the SQL examples. Please consult

 * Unified documentation on how to run pipelines.

 *

 * <p>This example models a scenario of customers buying goods.

 *

 * <ul>

 *   <li>{@link Customer} represents a customer

 *   <li>{@link Order} represents an order by a customer

 * </ul>

 */

class UnifiedSqlPojoExample {

  public static void main(String[] args) {

    Pipeline pipeline = createPipeline(args);



    // First step is to get PCollections of source objects.

    // In this example we create them directly in memory using Create.of().

    //

    // In real world such PCollections will likely be obtained from some other source,

    // e.g. a database or a text file. This process is not specific to Unified SQL,

    // please consult Unified programming guide for details.



    PCollection<Customer> customers = loadCustomers(pipeline);

    PCollection<Order> orders = loadOrders(pipeline);



    // Example 1. Run a simple query over java objects:

    PCollection<Row> customersFromWonderland =

        customers.apply(

            SqlTransform.query(

                "SELECT id, name "

                    + " FROM PCOLLECTION "

                    + " WHERE countryOfResidence = 'Wonderland'"));



    // Output the results of the query:

    customersFromWonderland.apply(logRecords(": is from Wonderland"));



    // Example 2. Query the results of the first query:

    PCollection<Row> totalInWonderland =

        customersFromWonderland.apply(SqlTransform.query("SELECT COUNT(id) FROM PCOLLECTION"));



    // Output the results of the query:

    totalInWonderland.apply(logRecords(": total customers in Wonderland"));



    // Example 3. Query multiple PCollections of Java objects:

    PCollection<Row> ordersByGrault =

        PCollectionTuple.of(new TupleTag<>("customers"), customers)

            .and(new TupleTag<>("orders"), orders)

            .apply(

                SqlTransform.query(

                    "SELECT customers.name, ('order id:' || CAST(orders.id AS VARCHAR))"

                        + " FROM orders "

                        + "   JOIN customers ON orders.customerId = customers.id"

                        + " WHERE customers.name = 'Grault'"));



    // Output the results of the query:

    ordersByGrault.apply(logRecords(": ordered by 'Grault'"));



    pipeline.run().waitUntilFinish();

  }



  private static MapElements<Row, Void> logRecords(String suffix) {

    return MapElements.via(

        new SimpleFunction<Row, Void>() {

          @Override

          public @Nullable

		  Void apply(Row input) {

            System.out.println(input.getValues() + suffix);

            return null;

          }

        });

  }



  private static Pipeline createPipeline(String[] args) {

    PipelineOptions options = PipelineOptionsFactory.fromArgs(args).as(PipelineOptions.class);

    return Pipeline.create(options);

  }



  private static PCollection<Customer> loadCustomers(Pipeline pipeline) {

    return pipeline.apply(

        Create.of(

            new Customer(1, "Foo", "Wonderland"),

            new Customer(2, "Bar", "Super Kingdom"),

            new Customer(3, "Baz", "Wonderland"),

            new Customer(4, "Grault", "Wonderland"),

            new Customer(5, "Qux", "Super Kingdom")));

  }



  private static PCollection<Order> loadOrders(Pipeline pipeline) {

    return pipeline.apply(

        Create.of(

            new Order(1, 5),

            new Order(2, 2),

            new Order(3, 1),

            new Order(4, 3),

            new Order(5, 1),

            new Order(6, 5),

            new Order(7, 4),

            new Order(8, 4),

            new Order(9, 1)));

  }

}